package com.bwie.realtime.jtp.dwd.log.function;

import com.bwie.realtime.jtp.dwd.log.bean.PageViewBean;
import org.apache.commons.lang3.time.FastDateFormat;

import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.xml.stream.events.StartDocument;

public class PageViewWindowFuncition implements WindowFunction<PageViewBean, String, String, TimeWindow>{


//    格式化实例
    private FastDateFormat format  =  FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");

    @Override
    public void apply(String key, TimeWindow Window,
                      Iterable<PageViewBean> input, Collector<String> collector) throws Exception {
//       获取窗口开始和结束时间
        String windowStartTime =  format.format(Window.getStart());
        String windowEndTime = format.format(Window.getEnd());
//        对窗口中数据计算
        Long pvCount=0L;
        Long pvDuringTime=0L;
        Long uvCount =0L;
        Long sessionCount=0L;
        for(PageViewBean pageViewBean: input){
            pvCount+= pageViewBean.getPvCount();
            pvDuringTime+= pageViewBean.getPvDuringTime();
            uvCount+= pageViewBean.getUvCount();
            sessionCount+= pageViewBean.getSessionCount();
        }
//        输出
        String output=windowStartTime+","+windowEndTime+","+windowStartTime.substring(0,10)+","
                +key+","
                +uvCount+","+sessionCount+","+pvCount+","+pvDuringTime;

        collector.collect(output);



    }

}


